package com.niit.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TypeInterceptor implements Interceptor {

    //声明一个存放事件的集合
    private List<Event> addHeaderEvents;

    //初始化方法
    @Override
    public void initialize() {
        //初始化存放事件的集合
        addHeaderEvents = new ArrayList<>();
    }

    //单个事件（消息 数据）拦截
    @Override
    public Event intercept(Event event) {
        //1.获取事件中的头信息 header
        Map<String, String> headers = event.getHeaders();
        //2.获取事件中的体信息 body
        byte[] body = event.getBody();
        String newBody = new String(body);
        //3.根据 body中是否含有MYFLUE来决定添加怎么样的头信息
        if(newBody.contains("MYFLUME")){
            //添加头信息
            headers.put("type","first");
        }else{
            headers.put("type","second");
        }
        //4.返回事件
        return event;
    }

    //批量事件（消息 数据）拦截
    @Override
    public List<Event> intercept(List<Event> list) {
        //1.清空集合
        addHeaderEvents.clear();
        //2.遍历 集合 List<Event> list
        for (Event event :list) {
            Event newEvent = intercept(event);
            addHeaderEvents.add(newEvent);
        }
        //返回结果
        return addHeaderEvents;
    }

    //关闭方法
    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

}
